package com.leo.jdkcore.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class KafkaByteMsgHandler implements KafkaMsgHandler<Integer,byte[]> {
	
	private KafkaReaderContext context;
	private ThreadPoolTaskExecutor executor;
	
	/**
	 * @param executor
	 */
	public KafkaByteMsgHandler(KafkaReaderContext context) {
		this.context = context;
		this.executor = context.getExecutor();
	}

	@Override
	public void handlerMsg(byte[] message, ConsumerRecord<Integer, byte[]> record) {
		executor.submit(new MsgProcessTask(new String(message), context));
	}

	@Override
	public boolean canPoll() {
		return true;
	}
}
